{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<h1> Preprocessing using tf.transform and Dataflow </h1>\n",
    "\n",
    "This notebook illustrates:\n",
    "<ol>\n",
    "<li> Creating datasets for Machine Learning using tf.transform and Dataflow\n",
    "</ol>\n",
    "<p>\n",
    "While Pandas is fine for experimenting, for operationalization of your workflow, it is better to do preprocessing in Apache Beam. This will also help if you need to preprocess data in flight, since Apache Beam also allows for streaming.\n",
    "<p>\n",
    "Only specific combinations of TensorFlow/Beam are supported by tf.transform. So make sure to get a combo that is.\n",
    "* TFT 0.3 \n",
    "* TF 1.3\n",
    "* Apache Beam [GCP] 2.1.1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "outputs": [],
   "source": [
    "%bash\n",
    "pip uninstall -y google-cloud-dataflow\n",
    "pip install --upgrade --force tensorflow_transform==0.1.10 apache-beam[gcp]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "apache-beam==2.1.1\n",
      "tensorflow==1.2.0\n",
      "tensorflow-transform==0.1.10\n"
     ]
    }
   ],
   "source": [
    "%bash\n",
    "pip freeze | grep -e 'flow\\|beam'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1.2.0\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "No handlers could be found for logger \"oauth2client.contrib.multistore_file\"\n",
      "/usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:39:44.433221. Please add timezone info to timestamps.\n",
      "  chunks = self.iterencode(o, _one_shot=True)\n"
     ]
    }
   ],
   "source": [
    "import tensorflow as tf\n",
    "import apache_beam as beam\n",
    "print tf.__version__"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:39:50.945296. Please add timezone info to timestamps.\n",
      "  chunks = self.iterencode(o, _one_shot=True)\n"
     ]
    }
   ],
   "source": [
    "# change these to try this notebook out\n",
    "BUCKET = 'asl-ml-immersion-temp'\n",
    "PROJECT = 'asl-ml-immersion'\n",
    "REGION = 'us-central1'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:39:52.072607. Please add timezone info to timestamps.\n",
      "  chunks = self.iterencode(o, _one_shot=True)\n"
     ]
    }
   ],
   "source": [
    "import os\n",
    "os.environ['BUCKET'] = BUCKET\n",
    "os.environ['PROJECT'] = PROJECT\n",
    "os.environ['REGION'] = REGION"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Updated property [core/project].\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:39:53.495436. Please add timezone info to timestamps.\n",
      "  chunks = self.iterencode(o, _one_shot=True)\n"
     ]
    }
   ],
   "source": [
    "!gcloud config set project $PROJECT"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "outputs": [],
   "source": [
    "%%bash\n",
    "if ! gcloud storage ls | grep -q gs://${BUCKET}/; then\n",    "  gcloud storage buckets create --location=${REGION} gs://${BUCKET}\n",    "fi"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<h2> Save the query from earlier </h2>\n",
    "\n",
    "The data is natality data (record of births in the US). My goal is to predict the baby's weight given a number of factors about the pregnancy and the baby's mother.  Later, we will want to split the data into training and eval datasets. The hash of the year-month will be used for that."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:39:57.591717. Please add timezone info to timestamps.\n",
      "  chunks = self.iterencode(o, _one_shot=True)\n"
     ]
    }
   ],
   "source": [
    "query=\"\"\"\n",
    "SELECT\n",
    "  weight_pounds,\n",
    "  is_male,\n",
    "  mother_age,\n",
    "  mother_race,\n",
    "  plurality,\n",
    "  gestation_weeks,\n",
    "  mother_married,\n",
    "  ever_born,\n",
    "  cigarette_use,\n",
    "  alcohol_use,\n",
    "  FARM_FINGERPRINT(CONCAT(CAST(YEAR AS STRING), CAST(month AS STRING))) AS hashmonth\n",
    "FROM\n",
    "  publicdata.samples.natality\n",
    "WHERE year > 2000\n",
    "\"\"\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>weight_pounds</th>\n",
       "      <th>is_male</th>\n",
       "      <th>mother_age</th>\n",
       "      <th>mother_race</th>\n",
       "      <th>plurality</th>\n",
       "      <th>gestation_weeks</th>\n",
       "      <th>mother_married</th>\n",
       "      <th>ever_born</th>\n",
       "      <th>cigarette_use</th>\n",
       "      <th>alcohol_use</th>\n",
       "      <th>hashmonth</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>6.311835</td>\n",
       "      <td>False</td>\n",
       "      <td>18</td>\n",
       "      <td>3</td>\n",
       "      <td>1</td>\n",
       "      <td>39.0</td>\n",
       "      <td>False</td>\n",
       "      <td>2</td>\n",
       "      <td>None</td>\n",
       "      <td>False</td>\n",
       "      <td>-7146494315947640619</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>7.500126</td>\n",
       "      <td>False</td>\n",
       "      <td>19</td>\n",
       "      <td>1</td>\n",
       "      <td>1</td>\n",
       "      <td>38.0</td>\n",
       "      <td>True</td>\n",
       "      <td>1</td>\n",
       "      <td>None</td>\n",
       "      <td>False</td>\n",
       "      <td>-774501970389208065</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>7.936641</td>\n",
       "      <td>False</td>\n",
       "      <td>19</td>\n",
       "      <td>1</td>\n",
       "      <td>1</td>\n",
       "      <td>40.0</td>\n",
       "      <td>False</td>\n",
       "      <td>1</td>\n",
       "      <td>None</td>\n",
       "      <td>False</td>\n",
       "      <td>8904940584331855459</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>6.999677</td>\n",
       "      <td>False</td>\n",
       "      <td>19</td>\n",
       "      <td>1</td>\n",
       "      <td>1</td>\n",
       "      <td>39.0</td>\n",
       "      <td>False</td>\n",
       "      <td>3</td>\n",
       "      <td>None</td>\n",
       "      <td>False</td>\n",
       "      <td>-7170969733900686954</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>9.169025</td>\n",
       "      <td>False</td>\n",
       "      <td>19</td>\n",
       "      <td>1</td>\n",
       "      <td>1</td>\n",
       "      <td>40.0</td>\n",
       "      <td>False</td>\n",
       "      <td>2</td>\n",
       "      <td>None</td>\n",
       "      <td>False</td>\n",
       "      <td>-7170969733900686954</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   weight_pounds is_male  mother_age  mother_race  plurality  gestation_weeks  \\\n",
       "0       6.311835   False          18            3          1             39.0   \n",
       "1       7.500126   False          19            1          1             38.0   \n",
       "2       7.936641   False          19            1          1             40.0   \n",
       "3       6.999677   False          19            1          1             39.0   \n",
       "4       9.169025   False          19            1          1             40.0   \n",
       "\n",
       "  mother_married  ever_born cigarette_use alcohol_use            hashmonth  \n",
       "0          False          2          None       False -7146494315947640619  \n",
       "1           True          1          None       False  -774501970389208065  \n",
       "2          False          1          None       False  8904940584331855459  \n",
       "3          False          3          None       False -7170969733900686954  \n",
       "4          False          2          None       False -7170969733900686954  "
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:39:58.567890. Please add timezone info to timestamps.\n",
      "  chunks = self.iterencode(o, _one_shot=True)\n"
     ]
    }
   ],
   "source": [
    "import google.datalab.bigquery as bq\n",
    "df = bq.Query(query + \" LIMIT 100\").execute().result().to_dataframe()\n",
    "df.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<h2> Create ML dataset using tf.transform and Dataflow </h2>\n",
    "<p>\n",
    "Let's use Cloud Dataflow to read in the BigQuery data and write it out as CSV files. Along the way, let's use tf.transform to do scaling and transforming. Using tf.transform allows us to save the metadata to ensure that the appropriate transformations get carried out during prediction as well.\n",
    "<p>\n",
    "Note that after you launch this, the notebook won't show you progress. Go to the GCP webconsole to the Dataflow section and monitor the running job. It took about <b>30 minutes</b> for me. If you wish to continue without doing this step, you can copy my preprocessed output:\n",
    "<pre>\n",
    "gcloud storage cp --recursive gs://asl-ml-immersion/babyweight/preproc_tft gs://your-bucket/\n",    "</pre>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "tensorflow-transform==0.1.10\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:40:12.436963. Please add timezone info to timestamps.\n",
      "  chunks = self.iterencode(o, _one_shot=True)\n"
     ]
    }
   ],
   "source": [
    "%bash\n",
    "# makes sure that the version of tensorflow and tensorflow_transform that we are using is on the worker machines\n",
    "pip freeze | grep tensorflow-transform > requirements.txt\n",
    "cat requirements.txt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/usr/local/lib/python2.7/dist-packages/scipy/ndimage/measurements.py:36: RuntimeWarning: numpy.dtype size changed, may indicate binary incompatibility. Expected 96, got 88\n",
      "  from . import _ni_label\n",
      "/usr/local/lib/python2.7/dist-packages/scipy/ndimage/measurements.py:36: RuntimeWarning: numpy.ufunc size changed, may indicate binary incompatibility. Expected 192, got 176\n",
      "  from . import _ni_label\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Launching Dataflow job preprocess-babyweight-features-171015-184021 ... hang on\n",
      "WARNING:tensorflow:From /usr/local/lib/python2.7/dist-packages/tensorflow_transform/mappers.py:305: string_to_index_table_from_tensor (from tensorflow.contrib.lookup.lookup_ops) is deprecated and will be removed after 2017-04-10.\n",
      "Instructions for updating:\n",
      "Use `index_table_from_tensor`.\n",
      "INFO:tensorflow:Assets added to graph.\n",
      "INFO:tensorflow:No assets to write.\n",
      "INFO:tensorflow:SavedModel written to: gs://asl-ml-immersion-temp/babyweight/preproc_tft/tmp/tftransform_tmp/d9c8204d8ebc439caa6853854675ee09/saved_model.pb\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/usr/local/lib/python2.7/dist-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: Any.\n",
      "  warnings.warn('Using fallback coder for typehint: %r.' % typehint)\n",
      "/usr/local/lib/python2.7/dist-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: Union[float, int, long, str, unicode].\n",
      "  warnings.warn('Using fallback coder for typehint: %r.' % typehint)\n",
      "/usr/local/lib/python2.7/dist-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: <type 'NoneType'>.\n",
      "  warnings.warn('Using fallback coder for typehint: %r.' % typehint)\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "INFO:tensorflow:Assets added to graph.\n",
      "INFO:tensorflow:No assets to write.\n",
      "INFO:tensorflow:SavedModel written to: gs://asl-ml-immersion-temp/babyweight/preproc_tft/tmp/tftransform_tmp/32d3edf033e94bbcb99757bf94791270/saved_model.pb\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/usr/local/lib/python2.7/dist-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: Dict[str, Union[SparseTensorValue, ndarray]].\n",
      "  warnings.warn('Using fallback coder for typehint: %r.' % typehint)\n",
      "/usr/local/lib/python2.7/dist-packages/apache_beam/coders/typecoders.py:135: UserWarning: Using fallback coder for typehint: List[Any].\n",
      "  warnings.warn('Using fallback coder for typehint: %r.' % typehint)\n"
     ]
    },
    {
     "ename": "CalledProcessError",
     "evalue": "Command '['/usr/bin/python', '-m', 'pip', 'install', '--download', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--no-binary', ':all:']' returned non-zero exit status 1",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m\u001b[0m",
      "\u001b[0;31mCalledProcessError\u001b[0mTraceback (most recent call last)",
      "\u001b[0;32m<ipython-input-9-e25a9a22b6c2>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[1;32m    145\u001b[0m   \u001b[0mjob\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrun\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    146\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 147\u001b[0;31m \u001b[0mpreprocess\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mquery\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0min_test_mode\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mFalse\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
      "\u001b[0;32m<ipython-input-9-e25a9a22b6c2>\u001b[0m in \u001b[0;36mpreprocess\u001b[0;34m(query, in_test_mode)\u001b[0m\n\u001b[1;32m    141\u001b[0m       _ = (transform_fn\n\u001b[1;32m    142\u001b[0m            \u001b[0;34m|\u001b[0m \u001b[0;34m'WriteTransformFn'\u001b[0m \u001b[0;34m>>\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 143\u001b[0;31m            transform_fn_io.WriteTransformFn(os.path.join(OUTPUT_DIR, 'metadata')))\n\u001b[0m\u001b[1;32m    144\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    145\u001b[0m   \u001b[0mjob\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrun\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc\u001b[0m in \u001b[0;36m__exit__\u001b[0;34m(self, exc_type, exc_val, exc_tb)\u001b[0m\n\u001b[1;32m    333\u001b[0m   \u001b[0;32mdef\u001b[0m \u001b[0m__exit__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mexc_type\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mexc_val\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mexc_tb\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    334\u001b[0m     \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mexc_type\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 335\u001b[0;31m       \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrun\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwait_until_finish\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    336\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    337\u001b[0m   \u001b[0;32mdef\u001b[0m \u001b[0mvisit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mvisitor\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc\u001b[0m in \u001b[0;36mrun\u001b[0;34m(self, test_runner_api)\u001b[0m\n\u001b[1;32m    326\u001b[0m       \u001b[0;32mfinally\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    327\u001b[0m         \u001b[0mshutil\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrmtree\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtmpdir\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 328\u001b[0;31m     \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrunner\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrun\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    329\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    330\u001b[0m   \u001b[0;32mdef\u001b[0m \u001b[0m__enter__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/dataflow_runner.pyc\u001b[0m in \u001b[0;36mrun\u001b[0;34m(self, pipeline)\u001b[0m\n\u001b[1;32m    281\u001b[0m     \u001b[0;31m# Create the job\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    282\u001b[0m     result = DataflowPipelineResult(\n\u001b[0;32m--> 283\u001b[0;31m         self.dataflow_client.create_job(self.job), self)\n\u001b[0m\u001b[1;32m    284\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    285\u001b[0m     \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_metrics\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mDataflowMetrics\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdataflow_client\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjob\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.pyc\u001b[0m in \u001b[0;36mwrapper\u001b[0;34m(*args, **kwargs)\u001b[0m\n\u001b[1;32m    166\u001b[0m       \u001b[0;32mwhile\u001b[0m \u001b[0mTrue\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    167\u001b[0m         \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 168\u001b[0;31m           \u001b[0;32mreturn\u001b[0m \u001b[0mfun\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    169\u001b[0m         \u001b[0;32mexcept\u001b[0m \u001b[0mException\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mexn\u001b[0m\u001b[0;34m:\u001b[0m  \u001b[0;31m# pylint: disable=broad-except\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    170\u001b[0m           \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mretry_filter\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mexn\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/apiclient.pyc\u001b[0m in \u001b[0;36mcreate_job\u001b[0;34m(self, job)\u001b[0m\n\u001b[1;32m    421\u001b[0m   \u001b[0;32mdef\u001b[0m \u001b[0mcreate_job\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mjob\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    422\u001b[0m     \u001b[0;34m\"\"\"Creates job description. May stage and/or submit for remote execution.\"\"\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 423\u001b[0;31m     \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcreate_job_description\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mjob\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    424\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    425\u001b[0m     \u001b[0;31m# Stage and submit the job when necessary\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/apiclient.pyc\u001b[0m in \u001b[0;36mcreate_job_description\u001b[0;34m(self, job)\u001b[0m\n\u001b[1;32m    444\u001b[0m     \u001b[0;34m\"\"\"Creates a job described by the workflow proto.\"\"\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    445\u001b[0m     resources = dependency.stage_job_resources(\n\u001b[0;32m--> 446\u001b[0;31m         job.options, file_copy=self._gcs_file_copy)\n\u001b[0m\u001b[1;32m    447\u001b[0m     job.proto.environment = Environment(\n\u001b[1;32m    448\u001b[0m         \u001b[0mpackages\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mresources\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0moptions\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mjob\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0moptions\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/dependency.pyc\u001b[0m in \u001b[0;36mstage_job_resources\u001b[0;34m(options, file_copy, build_setup_args, temp_dir, populate_requirements_cache)\u001b[0m\n\u001b[1;32m    325\u001b[0m       \u001b[0mos\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmakedirs\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mrequirements_cache_path\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    326\u001b[0m     populate_requirements_cache(\n\u001b[0;32m--> 327\u001b[0;31m         setup_options.requirements_file, requirements_cache_path)\n\u001b[0m\u001b[1;32m    328\u001b[0m     \u001b[0;32mfor\u001b[0m \u001b[0mpkg\u001b[0m \u001b[0;32min\u001b[0m  \u001b[0mglob\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mglob\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mos\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjoin\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mrequirements_cache_path\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'*'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    329\u001b[0m       file_copy(pkg, FileSystems.join(google_cloud_options.staging_location,\n",
      "\u001b[0;32m/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/dependency.pyc\u001b[0m in \u001b[0;36m_populate_requirements_cache\u001b[0;34m(requirements_file, cache_dir)\u001b[0m\n\u001b[1;32m    259\u001b[0m       '--no-binary', ':all:']\n\u001b[1;32m    260\u001b[0m   \u001b[0mlogging\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0minfo\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'Executing command: %s'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcmd_args\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 261\u001b[0;31m   \u001b[0mprocesses\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcheck_call\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcmd_args\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    262\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    263\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/usr/local/lib/python2.7/dist-packages/apache_beam/utils/processes.pyc\u001b[0m in \u001b[0;36mcheck_call\u001b[0;34m(*args, **kwargs)\u001b[0m\n\u001b[1;32m     42\u001b[0m   \u001b[0;32mif\u001b[0m \u001b[0mforce_shell\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     43\u001b[0m     \u001b[0mkwargs\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'shell'\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mTrue\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 44\u001b[0;31m   \u001b[0;32mreturn\u001b[0m \u001b[0msubprocess\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcheck_call\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     45\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     46\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/usr/lib/python2.7/subprocess.pyc\u001b[0m in \u001b[0;36mcheck_call\u001b[0;34m(*popenargs, **kwargs)\u001b[0m\n\u001b[1;32m    539\u001b[0m         \u001b[0;32mif\u001b[0m \u001b[0mcmd\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0mNone\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    540\u001b[0m             \u001b[0mcmd\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mpopenargs\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 541\u001b[0;31m         \u001b[0;32mraise\u001b[0m \u001b[0mCalledProcessError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mretcode\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcmd\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    542\u001b[0m     \u001b[0;32mreturn\u001b[0m \u001b[0;36m0\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    543\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;31mCalledProcessError\u001b[0m: Command '['/usr/bin/python', '-m', 'pip', 'install', '--download', '/tmp/dataflow-requirements-cache', '-r', 'requirements.txt', '--no-binary', ':all:']' returned non-zero exit status 1"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/usr/local/lib/python2.7/dist-packages/simplejson/encoder.py:291: DeprecationWarning: Interpreting naive datetime as local 2017-10-15 18:40:19.177949. Please add timezone info to timestamps.\n",
      "  chunks = self.iterencode(o, _one_shot=True)\n"
     ]
    }
   ],
   "source": [
    "import datetime\n",
    "import apache_beam as beam\n",
    "import tensorflow_transform as tft\n",
    "from tensorflow_transform.beam import impl as beam_impl\n",
    "\n",
    "def preprocess_tft(inputs):\n",
    "    import copy\n",
    "    import numpy as np\n",
    "    def center(x):\n",
    "          return x - tft.mean(x)\n",
    "    result = copy.copy(inputs) # shallow copy\n",
    "    result['mother_age_tft'] = center(inputs['mother_age'])\n",
    "    result['gestation_weeks_centered'] = tft.scale_to_0_1(inputs['gestation_weeks'])\n",
    "    result['mother_race_tft'] = tft.string_to_int(inputs['mother_race'])\n",
    "    return result\n",
    "    #return inputs\n",
    "\n",
    "def cleanup(rowdict):\n",
    "    import copy, hashlib\n",
    "    CSV_COLUMNS = 'weight_pounds,is_male,mother_age,mother_race,plurality,gestation_weeks,mother_married,cigarette_use,alcohol_use'.split(',')\n",
    "    STR_COLUMNS = 'key,is_male,mother_race,mother_married,cigarette_use,alcohol_use'.split(',')\n",
    "    FLT_COLUMNS = 'weight_pounds,mother_age,plurality,gestation_weeks'.split(',')\n",
    "    \n",
    "    # add any missing columns, and correct the types\n",
    "    def tofloat(value, ifnot):\n",
    "      try:\n",
    "        return float(value)\n",
    "      except (ValueError, TypeError):\n",
    "        return ifnot\n",
    "\n",
    "    result = {\n",
    "      k : str(rowdict[k]) if k in rowdict else 'None' for k in STR_COLUMNS\n",
    "    }\n",
    "    result.update({\n",
    "        k : tofloat(rowdict[k], -99) if k in rowdict else -99 for k in FLT_COLUMNS\n",
    "      })\n",
    "    \n",
    "    # modify opaque numeric race code into human-readable data\n",
    "    races = dict(zip([1,2,3,4,5,6,7,18,28,39,48],\n",
    "                     ['White', 'Black', 'American Indian', 'Chinese', \n",
    "                      'Japanese', 'Hawaiian', 'Filipino',\n",
    "                      'Asian Indian', 'Korean', 'Samaon', 'Vietnamese'])) \n",
    "    if 'mother_race' in rowdict and rowdict['mother_race'] in races:\n",
    "      result['mother_race'] = races[rowdict['mother_race']]\n",
    "    else:\n",
    "      result['mother_race'] = 'Unknown'    \n",
    "    \n",
    "    # cleanup: write out only the data we that we want to train on\n",
    "    if result['weight_pounds'] > 0 and result['mother_age'] > 0 and result['gestation_weeks'] > 0 and result['plurality'] > 0:\n",
    "      data = ','.join([str(result[k]) for k in CSV_COLUMNS])\n",
    "      result['key'] = hashlib.sha224(data).hexdigest()\n",
    "      yield result \n",
    "  \n",
    "def preprocess(query, in_test_mode):\n",
    "  import os\n",
    "  import os.path\n",
    "  import tempfile\n",
    "  from apache_beam.io import tfrecordio\n",
    "  from tensorflow_transform.coders import example_proto_coder\n",
    "  from tensorflow_transform.tf_metadata import dataset_metadata\n",
    "  from tensorflow_transform.tf_metadata import dataset_schema\n",
    "  from tensorflow_transform.beam.tft_beam_io import transform_fn_io\n",
    "\n",
    "  job_name = 'preprocess-babyweight-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')    \n",
    "  if in_test_mode:\n",
    "    import shutil\n",
    "    print 'Launching local job ... hang on'\n",
    "    OUTPUT_DIR = './preproc_tft'\n",
    "    shutil.rmtree(OUTPUT_DIR, ignore_errors=True)\n",
    "  else:\n",
    "    print 'Launching Dataflow job {} ... hang on'.format(job_name)\n",
    "    OUTPUT_DIR = 'gs://{0}/babyweight/preproc_tft/'.format(BUCKET)\n",
    "    import subprocess\n",
"    subprocess.call('gcloud storage rm --recursive {}'.format(OUTPUT_DIR).split())\n",    "    \n",
    "  options = {\n",
    "    'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),\n",
    "    'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),\n",
    "    'job_name': job_name,\n",
    "    'project': PROJECT,\n",
    "    'max_num_workers': 24,\n",
    "    'teardown_policy': 'TEARDOWN_ALWAYS',\n",
    "    'no_save_main_session': True,\n",
    "    'requirements_file': 'requirements.txt'\n",
    "  }\n",
    "  opts = beam.pipeline.PipelineOptions(flags=[], **options)\n",
    "  if in_test_mode:\n",
    "    RUNNER = 'DirectRunner'\n",
    "  else:\n",
    "    RUNNER = 'DataflowRunner'\n",
    "\n",
    "  # set up metadata  \n",
    "  raw_data_schema = {\n",
    "    colname : dataset_schema.ColumnSchema(tf.string, [], dataset_schema.FixedColumnRepresentation())\n",
    "                   for colname in 'key,is_male,mother_race,mother_married,cigarette_use,alcohol_use'.split(',')\n",
    "  }\n",
    "  raw_data_schema.update({\n",
    "      colname : dataset_schema.ColumnSchema(tf.float32, [], dataset_schema.FixedColumnRepresentation())\n",
    "                   for colname in 'weight_pounds,mother_age,plurality,gestation_weeks'.split(',')\n",
    "    })\n",
    "  raw_data_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema(raw_data_schema))\n",
    "\n",
    "  def read_rawdata(p, step, test_mode):\n",
    "    if step == 'train':\n",
    "        selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) < 3'.format(query)\n",
    "    else:\n",
    "        selquery = 'SELECT * FROM ({}) WHERE ABS(MOD(hashmonth, 4)) = 3'.format(query)\n",
    "    if in_test_mode:\n",
    "        selquery = selquery + ' LIMIT 100'\n",
    "    #print 'Processing {} data from {}'.format(step, selquery)\n",
    "    return (p \n",
    "          | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query=selquery, use_standard_sql=True))\n",
    "          | '{}_cleanup'.format(step) >> beam.FlatMap(cleanup)\n",
    "                   )\n",
    "  \n",
    "  # run Beam  \n",
    "  with beam.Pipeline(RUNNER, options=opts) as p:\n",
    "    with beam_impl.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp')):\n",
    "\n",
    "      # analyze and transform training       \n",
    "      raw_data = read_rawdata(p, 'train', in_test_mode)\n",
    "      raw_dataset = (raw_data, raw_data_metadata)\n",
    "      transformed_dataset, transform_fn = (\n",
    "          raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))\n",
    "      transformed_data, transformed_metadata = transformed_dataset\n",
    "      _ = transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(\n",
    "          os.path.join(OUTPUT_DIR, 'train'),\n",
    "          coder=example_proto_coder.ExampleProtoCoder(\n",
    "              transformed_metadata.schema))\n",
    "      \n",
    "      # transform eval data\n",
    "      raw_test_data = read_rawdata(p, 'eval', in_test_mode)\n",
    "      raw_test_dataset = (raw_test_data, raw_data_metadata)\n",
    "      transformed_test_dataset = (\n",
    "          (raw_test_dataset, transform_fn) | beam_impl.TransformDataset())\n",
    "      transformed_test_data, _ = transformed_test_dataset\n",
    "      _ = transformed_test_data | 'WriteTestData' >> tfrecordio.WriteToTFRecord(\n",
    "          os.path.join(OUTPUT_DIR, 'eval'),\n",
    "          coder=example_proto_coder.ExampleProtoCoder(\n",
    "              transformed_metadata.schema))\n",
    "      _ = (transform_fn\n",
    "           | 'WriteTransformFn' >>\n",
    "           transform_fn_io.WriteTransformFn(os.path.join(OUTPUT_DIR, 'metadata')))\n",
    "\n",
    "  job = p.run()\n",
    "  \n",
    "preprocess(query, in_test_mode=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "jupyter": {
     "outputs_hidden": false
    }
   },
   "outputs": [],
   "source": [
    "%bash\n",
    "gcloud storage ls gs://${BUCKET}/babyweight/preproc_tft/*-00000*"   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
